package cn.xshi.sentinel.util;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Properties;

@Component
@Slf4j
public class KafkaUtils {

    @Resource
    FlowRuleUtil flowRuleUtil;

    /**
     *
     * @return
     */
    private KafkaProducer<String, String> createProducer() {
        if(StringUtils.isBlank(flowRuleUtil.getSentinelKafkaAddress())){
            return null;
        }
        Properties props = new Properties();
        props.put("bootstrap.servers", flowRuleUtil.getSentinelKafkaAddress());
        props.put("acks", flowRuleUtil.getSentinelKafkaAcks());
        props.put("retries", flowRuleUtil.getSentinelKafkaRetries());
        props.put("batch.size", flowRuleUtil.getSentinelKafkaBatchSize());
        props.put("linger.ms", flowRuleUtil.getSentinelKafkaLingerMs());
        props.put("buffer.memory", flowRuleUtil.getSentinelKafkaBufferMemory());
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<String, String>((props));
    }

    /**
     * 发送信息
     * @param topic
     * @param message
     */
    public void send(String topic, String message) {
        if(StringUtils.isBlank(topic)){
            log.error("topic为空");
            return;
        }
        Producer<String,String> producer = createProducer();
        if(null == producer){
            log.error("producer为空");
            return;
        }
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
        try {
            producer.send(producerRecord);
        }catch (Exception e){
            log.error("发送Kafka信息异常：{}",e);
        }finally {
            if(null != producer){
                producer.close();
            }
        }
    }

    /**
     * 同步发送并返回
     * @param topic
     * @param message
     */
    public RecordMetadata syncSend(String topic, String message) {
        Producer<String,String> producer = createProducer();
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
        RecordMetadata recordMetadata = null;
        try {
            recordMetadata = producer.send(producerRecord).get();
        }catch (Exception e){
            log.error("调用同步发送并返回结果方法异常：{}",e);
        }finally {
            if(null != producer){
                producer.close();
            }
        }
        return recordMetadata;
    }
}
